qpid 接收消息编程

qpid 是符合AMQP规范的apache 许可证的消息中间件,目前在openstack中作为一种可选的消息中间件服务配置,其他还有rabbitmq和zeroMQ

如果看过qpid的编程API文档的话,会看到比较简单的一个例子, 如下(接收消息打印消息内容)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#following code is from
# http://qpid.apache.org/releases/qpid-0.24/messaging-api/python/examples/drain.html


import optparse
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN

parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
description="Drain messages from the supplied address.")
parser.add_option("-b", "--broker", default="localhost",
help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type="int",
help="number of messages to drain")
parser.add_option("-f", "--forever", action="store_true",
help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",
help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,
help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",
help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type="float", default=0,
help="timeout in seconds to wait before exiting (default %default)")
parser.add_option("-p", "--print", dest="format", default="%(M)s",
help="format string for printing messages (default %default)")
parser.add_option("-v", dest="verbose", action="store_true",
help="enable logging")

opts, args = parser.parse_args()

if opts.verbose:
enable("qpid", DEBUG)
else:
enable("qpid", WARN)

if args:
addr = args.pop(0)
else:
parser.error("address is required")
if opts.forever:
timeout = None
else:
timeout = opts.timeout

class Formatter:

def __init__(self, message):
self.message = message
self.environ = {"M": self.message,
"P": self.message.properties,
"C": self.message.content}

def __getitem__(self, st):
return eval(st, self.environ)

conn = Connection(opts.broker,
reconnect=opts.reconnect,
reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
try:
conn.open()
ssn = conn.session()
rcv = ssn.receiver(addr)

count = 0
while not opts.count or count < opts.count:
try:
msg = rcv.fetch(timeout=timeout)
print opts.format % Formatter(msg)
count += 1
ssn.acknowledge()
except Empty:
break
except ReceiverError, e:
print e
except KeyboardInterrupt:
pass

conn.close()

connection和session是1对多的关系,每个session保证消息的顺序接收,让session创建对应的sender和receiver,这里我们只需要创建receiver

这个程序看起来很好,如果直接传一个地址, 比如(我们想要接收openstack glance的message)
运行如下:
python drain.py -b admin/qpid@localhost glance

如果我们想要实现连接断掉后重新自动连接,我们可以传入参数 -r
上面的程序有个问题,如果没有收到消息就断开连接了。

新需求1: 我们要实现持续的监听接收消息, 改一下

1
2
3
4
5
6
7
try:
msg = rcv.fetch(timeout=timeout)
print opts.format % Formatter(msg)
count += 1
ssn.acknowledge()
except Empty:
time.sleep(0.5)

这样就可以了。

还不行,

新需求2:我们要实现断后重新自动连接, 可以, 传入参数 -r,

解决了

问题出现了,你会发现在service qpidd restart后,程序会抛出exception
qpid.messaging.exceptions.NotFound: no such queue: glance

新需求3:显然这是由于我们创建的queue不是durable的,所以需要在qpid restart后能够正常运行,而不是退出。
这就需要我们创建的queue能够在接收到消息的时候自动创建完成,

解决方法: rcv = ssn.receiver(addr + “; {create: always}”)

这样就完成queue的按需创建

新需求4: qpid 默认的reconnect上面的是基于固定interval,我们想改变重新建立连接的算法,实现2的指数式建立连接
解决方法: 我们写入自己的自动重连方法, 一个简单的例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def reconnect():
global rcv
global ssn
attempt = 0
delay = 1
while True:
if conn.opened():
try:
conn.close()
except exceptions.ConnectionError:
pass
attempt += 1
print "The %s time attempt for reconnecting qpid server" % str(attempt)
try:
connection_init()
conn.open()
except exceptions.ConnectionError, e:
delay = min(2 * delay, 60)
time.sleep(delay)
pass
else:
break
print "qpid server reconnection created"
ssn = conn.session()
rcv = ssn.receiver(addr + "; {create: always}")

这样就OK了

总结: 经过上面的需求变化,我们新的接收消息程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import optparse
import time
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN

parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
description="Drain messages from the supplied address.")
parser.add_option("-b", "--broker", default="localhost",
help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type="int",
help="number of messages to drain")
parser.add_option("-f", "--forever", action="store_true",
help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",
help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,
help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",
help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type="float", default=0,
help="timeout in seconds to wait before exiting (default %default)")
parser.add_option("-p", "--print", dest="format", default="%(M)s",
help="format string for printing messages (default %default)")
parser.add_option("-v", dest="verbose", action="store_true",
help="enable logging")

opts, args = parser.parse_args()

if opts.verbose:
enable("qpid", DEBUG)
else:
enable("qpid", WARN)

if args:
addr = args.pop(0)
else:
parser.error("address is required")
if opts.forever:
timeout = None
else:
timeout = opts.timeout

count = 0
rcv = None
conn = None
ssn = None


class Formatter:

def __init__(self, message):
self.message = message
self.environ = {"M": self.message,
"P": self.message.properties,
"C": self.message.content}

def __getitem__(self, st):
return eval(st, self.environ)


def reconnect():
global rcv
global ssn
attempt = 0
delay = 1
while True:
if conn.opened():
try:
conn.close()
except exceptions.ConnectionError:
pass
attempt += 1
print "The %s time attempt for reconnecting qpid server" % str(attempt)
try:
connection_init()
conn.open()
except exceptions.ConnectionError, e:
delay = min(2 * delay, 60)
time.sleep(delay)
pass
else:
break
print "qpid server reconnection created"
ssn = conn.session()
rcv = ssn.receiver(addr + "; {create: always}")


def fetch():
global count
while not opts.count or count < opts.count:
try:
msg = rcv.fetch(timeout=timeout)
print opts.format % Formatter(msg)
count += 1
ssn.acknowledge()
except Empty:
time.sleep(0.5)
except exceptions.ConnectionError, e:
reconnect()
except Exception, e:
print e
raise e


def connection_init():
global conn
conn = Connection(opts.broker,
reconnect=opts.reconnect,
reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)


try:
connection_init()
conn.open()
ssn = conn.session()
rcv = ssn.receiver(addr + "; {create: always}")
fetch()

except ReceiverError, e:
print e
except KeyboardInterrupt:
pass
except exceptions.ConnectionError, e:
reconnect()


conn.close()